from pathlib import Path

import numpy as np
import pandas as pd


MISSING_CODES = {7, 8, 9, 77, 88, 99, 777, 888, 999, 7777, 8888, 9999}


def clean_numeric(series: pd.Series) -> pd.Series:
    s = pd.to_numeric(series, errors="coerce")
    s = s.mask(s.isin(MISSING_CODES))
    return s


def zscore(series: pd.Series) -> pd.Series:
    s = pd.to_numeric(series, errors="coerce")
    m = s.mean()
    sd = s.std(ddof=0)
    if pd.isna(sd) or sd <= 0:
        return s * 0
    return (s - m) / sd


def main() -> None:
    root = Path(__file__).resolve().parents[1]
    out_path = root / "data_external" / "bartik_region_year.csv"
    out_path.parent.mkdir(parents=True, exist_ok=True)

    ess = pd.read_parquet(root / "outputs" / "ess_r8_r11_min.parquet")
    cost = pd.read_csv(root / "data_external" / "cost_shifters_region_2015_eurostat.csv")
    wb = pd.read_csv(root / "data_external" / "broadband_country_year_worldbank.csv")

    ess = ess[ess["regunit"].isin([1, 2])].copy()
    ess["region"] = ess["region"].astype(str).str.upper().str.strip()
    ess = ess[ess["region"] != "99999"].copy()
    ess["survey_year"] = pd.to_numeric(ess["survey_year"], errors="coerce").astype("Int64")

    cost["region"] = cost["region"].astype(str).str.upper().str.strip()
    wb["year"] = pd.to_numeric(wb["year"], errors="coerce").astype("Int64")
    wb["fixed_broadband_per100"] = pd.to_numeric(wb["fixed_broadband_per100"], errors="coerce")

    # Country-year shock: change from baseline 2015 (or earliest available for that country).
    base_year = 2015
    wb_base = (
        wb[wb["year"] == base_year][["cntry", "fixed_broadband_per100"]]
        .rename(columns={"fixed_broadband_per100": "fixed_bb_base_2015"})
        .copy()
    )
    wb2 = wb.merge(wb_base, on="cntry", how="left")
    # Fallback to earliest available per country if 2015 missing.
    earliest = (
        wb.sort_values(["cntry", "year"])
        .groupby("cntry", as_index=False)
        .first()[["cntry", "fixed_broadband_per100"]]
        .rename(columns={"fixed_broadband_per100": "fixed_bb_base_earliest"})
    )
    wb2 = wb2.merge(earliest, on="cntry", how="left")
    wb2["fixed_bb_base"] = wb2["fixed_bb_base_2015"].where(wb2["fixed_bb_base_2015"].notna(), wb2["fixed_bb_base_earliest"])
    wb2["shock_fixed_bb"] = wb2["fixed_broadband_per100"] - wb2["fixed_bb_base"]

    # Keep only survey years present in ESS sample
    years = sorted(int(y) for y in ess["survey_year"].dropna().unique().tolist())
    wb2 = wb2[wb2["year"].isin(years)].copy()

    # Build region-year rows for ESS regions in those years
    regions = sorted(ess["region"].dropna().unique().tolist())
    base = pd.MultiIndex.from_product([regions, years], names=["region", "year"]).to_frame(index=False)
    base["cntry"] = base["region"].str[:2]

    base = base.merge(cost, on="region", how="left")
    base = base.merge(wb2[["cntry", "year", "shock_fixed_bb"]], on=["cntry", "year"], how="left")

    # Bartik instruments (supply-cost shifters × country-year shock)
    base["bartik_density"] = base["pop_density_2015_per_km2"] * base["shock_fixed_bb"]
    base["bartik_area"] = base["area_2015_km2"] * base["shock_fixed_bb"]
    base["bartik_pop"] = base["pop_2015"] * base["shock_fixed_bb"]

    # Standardize
    base["z_bartik_density"] = zscore(base["bartik_density"])
    base["z_bartik_area"] = zscore(base["bartik_area"])
    base["z_bartik_pop"] = zscore(base["bartik_pop"])

    out = base.rename(columns={"year": "survey_year"})
    out.to_csv(out_path, index=False, encoding="utf-8")
    print(f"Wrote: {out_path} rows={len(out):,} regions={out['region'].nunique():,} years={out['survey_year'].nunique():,}")


if __name__ == "__main__":
    main()

